Skip to content

Commit

Permalink
Feature-flagged support for in kubeconfig
Browse files Browse the repository at this point in the history
Signed-off-by: Razz4780 <[email protected]>
  • Loading branch information
Razz4780 committed Oct 12, 2023
1 parent c3fbe25 commit b76575c
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 123 deletions.
4 changes: 3 additions & 1 deletion kube-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ client = ["config", "__non_core", "hyper", "http-body", "tower", "tower-http", "
jsonpatch = ["kube-core/jsonpatch"]
admission = ["kube-core/admission"]
config = ["__non_core", "pem", "home"]
socks5 = ["hyper-socks2"]

# private feature sets; do not use
__non_core = ["tracing", "serde_yaml", "base64"]

[package.metadata.docs.rs]
features = ["client", "rustls-tls", "openssl-tls", "ws", "oauth", "oidc", "jsonpatch", "admission", "k8s-openapi/latest"]
features = ["client", "rustls-tls", "openssl-tls", "ws", "oauth", "oidc", "jsonpatch", "admission", "k8s-openapi/latest", "socks5"]
# Define the configuration attribute `docsrs`. Used to enable `doc_cfg` feature.
rustdoc-args = ["--cfg", "docsrs"]

Expand All @@ -59,6 +60,7 @@ jsonpath_lib = { version = "0.3.0", optional = true }
tokio-util = { version = "0.7.0", optional = true, features = ["io", "codec"] }
hyper = { version = "0.14.13", optional = true, features = ["client", "http1", "stream", "tcp"] }
hyper-rustls = { version = "0.24.0", optional = true }
hyper-socks2 = { version = "0.8.0", optional = true, default-features = false }
tokio-tungstenite = { version = "0.20.0", optional = true }
tower = { version = "0.4.13", optional = true, features = ["buffer", "filter", "util"] }
tower-http = { version = "0.4.0", optional = true, features = ["auth", "map-response-body", "trace"] }
Expand Down
244 changes: 135 additions & 109 deletions kube-client/src/client/builder.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use bytes::Bytes;
use http::{Request, Response};
use hyper::{self, client::HttpConnector};
use http::{header::HeaderMap, Request, Response};
use hyper::{
self,
client::{connect::Connection, HttpConnector},
};
use hyper_timeout::TimeoutConnector;
pub use kube_core::response::Status;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite};
use tower::{util::BoxService, BoxError, Layer, Service, ServiceBuilder};
use tower_http::{
classify::ServerErrorsFailureClass, map_response_body::MapResponseBodyLayer, trace::TraceLayer,
};
use tracing::Span;

use crate::{client::ConfigExt, Client, Config, Error, Result};

Expand Down Expand Up @@ -61,117 +67,137 @@ impl<Svc> ClientBuilder<Svc> {
}
}

impl TryFrom<Config> for ClientBuilder<BoxService<Request<hyper::Body>, Response<Box<DynBody>>, BoxError>> {
pub type GenericService = BoxService<Request<hyper::Body>, Response<Box<DynBody>>, BoxError>;

impl TryFrom<Config> for ClientBuilder<GenericService> {
type Error = Error;

/// Builds a default [`ClientBuilder`] stack from a given configuration
fn try_from(config: Config) -> Result<Self> {
use std::time::Duration;

use http::header::HeaderMap;
use tracing::Span;

let default_ns = config.default_namespace.clone();
let auth_layer = config.auth_layer()?;

let client: hyper::Client<_, hyper::Body> = {
let mut connector = HttpConnector::new();
connector.enforce_http(false);

// Current TLS feature precedence when more than one are set:
// 1. rustls-tls
// 2. openssl-tls
// Create a custom client to use something else.
// If TLS features are not enabled, http connector will be used.
#[cfg(feature = "rustls-tls")]
let connector = config.rustls_https_connector_with_connector(connector)?;
#[cfg(all(not(feature = "rustls-tls"), feature = "openssl-tls"))]
let connector = config.openssl_https_connector_with_connector(connector)?;
#[cfg(all(not(feature = "rustls-tls"), not(feature = "openssl-tls")))]
if auth_layer.is_none() || config.cluster_url.scheme() == Some(&http::uri::Scheme::HTTPS) {
// no tls stack situation only works on anonymous auth with http scheme
return Err(Error::TlsRequired);
}

let mut connector = TimeoutConnector::new(connector);

// Set the timeouts for the client
connector.set_connect_timeout(config.connect_timeout);
connector.set_read_timeout(config.read_timeout);
connector.set_write_timeout(config.write_timeout);

hyper::Client::builder().build(connector)
};

let stack = ServiceBuilder::new().layer(config.base_uri_layer()).into_inner();
#[cfg(feature = "gzip")]
let stack = ServiceBuilder::new()
.layer(stack)
.layer(tower_http::decompression::DecompressionLayer::new())
.into_inner();

let service = ServiceBuilder::new()
.layer(stack)
.option_layer(auth_layer)
.layer(config.extra_headers_layer()?)
.layer(
// Attribute names follow [Semantic Conventions].
// [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md
TraceLayer::new_for_http()
.make_span_with(|req: &Request<hyper::Body>| {
tracing::debug_span!(
"HTTP",
http.method = %req.method(),
http.url = %req.uri(),
http.status_code = tracing::field::Empty,
otel.name = req.extensions().get::<&'static str>().unwrap_or(&"HTTP"),
otel.kind = "client",
otel.status_code = tracing::field::Empty,
)
})
.on_request(|_req: &Request<hyper::Body>, _span: &Span| {
tracing::debug!("requesting");
})
.on_response(|res: &Response<hyper::Body>, _latency: Duration, span: &Span| {
let status = res.status();
span.record("http.status_code", status.as_u16());
if status.is_client_error() || status.is_server_error() {
span.record("otel.status_code", "ERROR");
}
})
// Explicitly disable `on_body_chunk`. The default does nothing.
.on_body_chunk(())
.on_eos(|_: Option<&HeaderMap>, _duration: Duration, _span: &Span| {
tracing::debug!("stream closed");
})
.on_failure(|ec: ServerErrorsFailureClass, _latency: Duration, span: &Span| {
// Called when
// - Calling the inner service errored
// - Polling `Body` errored
// - the response was classified as failure (5xx)
// - End of stream was classified as failure
let mut connector = HttpConnector::new();
connector.enforce_http(false);

#[cfg(feature = "socks5")]
if let Some(proxy_addr) = config.proxy_url.clone() {
let connector = hyper_socks2::SocksConnector {
proxy_addr,
auth: None,
connector,
};

return make_generic_builder(connector, config);
}

make_generic_builder(connector, config)
}
}

/// Helper function for implementation of [`TryFrom<Config>`] for [`ClientBuilder`].
/// Ignores [`Config::proxy_url`], which at this point is already handled.
fn make_generic_builder<H>(base_connector: H, config: Config) -> Result<ClientBuilder<GenericService>, Error>
where
H: 'static + Clone + Send + Sync + Service<http::Uri>,
H::Response: 'static + Connection + AsyncRead + AsyncWrite + Send + Unpin,
H::Future: 'static + Send,
H::Error: 'static + Send + Sync + std::error::Error,
{
let default_ns = config.default_namespace.clone();
let auth_layer = config.auth_layer()?;

let client: hyper::Client<_, hyper::Body> = {

Check warning on line 107 in kube-client/src/client/builder.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/builder.rs#L107

Added line #L107 was not covered by tests
// Current TLS feature precedence when more than one are set:
// 1. rustls-tls
// 2. openssl-tls
// Create a custom client to use something else.
// If TLS features are not enabled, http connector will be used.
#[cfg(feature = "rustls-tls")]
let connector = config.rustls_https_connector_with_connector(base_connector)?;
#[cfg(all(not(feature = "rustls-tls"), feature = "openssl-tls"))]
let connector = config.openssl_https_connector_with_connector(base_connector)?;
#[cfg(all(not(feature = "rustls-tls"), not(feature = "openssl-tls")))]
if auth_layer.is_none() || config.cluster_url.scheme() == Some(&http::uri::Scheme::HTTPS) {

Check warning on line 118 in kube-client/src/client/builder.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/builder.rs#L116-L118

Added lines #L116 - L118 were not covered by tests
// no tls stack situation only works on anonymous auth with http scheme
return Err(Error::TlsRequired);

Check warning on line 120 in kube-client/src/client/builder.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/builder.rs#L120

Added line #L120 was not covered by tests
}

let mut connector = TimeoutConnector::new(connector);

// Set the timeouts for the client
connector.set_connect_timeout(config.connect_timeout);
connector.set_read_timeout(config.read_timeout);
connector.set_write_timeout(config.write_timeout);

hyper::Client::builder().build(connector)
};

let stack = ServiceBuilder::new().layer(config.base_uri_layer()).into_inner();
#[cfg(feature = "gzip")]
let stack = ServiceBuilder::new()
.layer(stack)
.layer(tower_http::decompression::DecompressionLayer::new())

Check warning on line 137 in kube-client/src/client/builder.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/builder.rs#L135-L137

Added lines #L135 - L137 were not covered by tests
.into_inner();

let service = ServiceBuilder::new()
.layer(stack)
.option_layer(auth_layer)
.layer(config.extra_headers_layer()?)
.layer(
// Attribute names follow [Semantic Conventions].
// [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md
TraceLayer::new_for_http()
.make_span_with(|req: &Request<hyper::Body>| {
tracing::debug_span!(
"HTTP",

Check warning on line 150 in kube-client/src/client/builder.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/builder.rs#L150

Added line #L150 was not covered by tests
http.method = %req.method(),
http.url = %req.uri(),
http.status_code = tracing::field::Empty,

Check warning on line 153 in kube-client/src/client/builder.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/builder.rs#L153

Added line #L153 was not covered by tests
otel.name = req.extensions().get::<&'static str>().unwrap_or(&"HTTP"),
otel.kind = "client",
otel.status_code = tracing::field::Empty,

Check warning on line 156 in kube-client/src/client/builder.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/builder.rs#L155-L156

Added lines #L155 - L156 were not covered by tests
)
})
.on_request(|_req: &Request<hyper::Body>, _span: &Span| {
tracing::debug!("requesting");
})
.on_response(|res: &Response<hyper::Body>, _latency: Duration, span: &Span| {
let status = res.status();
span.record("http.status_code", status.as_u16());
if status.is_client_error() || status.is_server_error() {
span.record("otel.status_code", "ERROR");
match ec {
ServerErrorsFailureClass::StatusCode(status) => {
span.record("http.status_code", status.as_u16());
tracing::error!("failed with status {}", status)
}
ServerErrorsFailureClass::Error(err) => {
tracing::error!("failed with error {}", err)
}
}
}),
)
.service(client);

Ok(Self::new(
BoxService::new(
MapResponseBodyLayer::new(|body| {
Box::new(http_body::Body::map_err(body, BoxError::from)) as Box<DynBody>
}
})
.layer(service),
),
default_ns,
))
}
// Explicitly disable `on_body_chunk`. The default does nothing.
.on_body_chunk(())
.on_eos(|_: Option<&HeaderMap>, _duration: Duration, _span: &Span| {
tracing::debug!("stream closed");

Check warning on line 172 in kube-client/src/client/builder.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/builder.rs#L170-L172

Added lines #L170 - L172 were not covered by tests
})
.on_failure(|ec: ServerErrorsFailureClass, _latency: Duration, span: &Span| {

Check warning on line 174 in kube-client/src/client/builder.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/builder.rs#L174

Added line #L174 was not covered by tests
// Called when
// - Calling the inner service errored
// - Polling `Body` errored
// - the response was classified as failure (5xx)
// - End of stream was classified as failure
span.record("otel.status_code", "ERROR");
match ec {
ServerErrorsFailureClass::StatusCode(status) => {
span.record("http.status_code", status.as_u16());
tracing::error!("failed with status {}", status)

Check warning on line 184 in kube-client/src/client/builder.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/builder.rs#L180-L184

Added lines #L180 - L184 were not covered by tests
}
ServerErrorsFailureClass::Error(err) => {
tracing::error!("failed with error {}", err)

Check warning on line 187 in kube-client/src/client/builder.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/builder.rs#L186-L187

Added lines #L186 - L187 were not covered by tests
}
}
}),
)
.service(client);

Ok(ClientBuilder::new(
BoxService::new(
MapResponseBodyLayer::new(|body| {
Box::new(http_body::Body::map_err(body, BoxError::from)) as Box<DynBody>
})
.layer(service),
),
default_ns,
))
}
37 changes: 25 additions & 12 deletions kube-client/src/client/config_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ pub trait ConfigExt: private::Sealed {
/// ```
#[cfg_attr(docsrs, doc(cfg(feature = "rustls-tls")))]
#[cfg(feature = "rustls-tls")]
fn rustls_https_connector_with_connector(
fn rustls_https_connector_with_connector<H>(
&self,
connector: hyper::client::HttpConnector,
) -> Result<hyper_rustls::HttpsConnector<hyper::client::HttpConnector>>;
connector: H,
) -> Result<hyper_rustls::HttpsConnector<H>>;

/// Create [`rustls::ClientConfig`] based on config.
/// # Example
Expand Down Expand Up @@ -118,10 +118,16 @@ pub trait ConfigExt: private::Sealed {
/// ```
#[cfg_attr(docsrs, doc(cfg(feature = "openssl-tls")))]
#[cfg(feature = "openssl-tls")]
fn openssl_https_connector_with_connector(
fn openssl_https_connector_with_connector<H>(
&self,
connector: hyper::client::HttpConnector,
) -> Result<hyper_openssl::HttpsConnector<hyper::client::HttpConnector>>;
connector: H,
) -> Result<hyper_openssl::HttpsConnector<H>>
where
H: tower::Service<http::Uri> + Send,
H::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
H::Future: Send + 'static,
H::Response:
tokio::io::AsyncRead + tokio::io::AsyncWrite + hyper::client::connect::Connection + Unpin;

/// Create [`openssl::ssl::SslConnectorBuilder`] based on config.
/// # Example
Expand Down Expand Up @@ -215,10 +221,10 @@ impl ConfigExt for Config {
}

#[cfg(feature = "rustls-tls")]
fn rustls_https_connector_with_connector(
fn rustls_https_connector_with_connector<H>(
&self,
connector: hyper::client::HttpConnector,
) -> Result<hyper_rustls::HttpsConnector<hyper::client::HttpConnector>> {
connector: H,
) -> Result<hyper_rustls::HttpsConnector<H>> {
let rustls_config = self.rustls_client_config()?;
let mut builder = hyper_rustls::HttpsConnectorBuilder::new()
.with_tls_config(rustls_config)
Expand All @@ -245,10 +251,17 @@ impl ConfigExt for Config {
}

#[cfg(feature = "openssl-tls")]
fn openssl_https_connector_with_connector(
fn openssl_https_connector_with_connector<H>(
&self,
connector: hyper::client::HttpConnector,
) -> Result<hyper_openssl::HttpsConnector<hyper::client::HttpConnector>> {
connector: H,
) -> Result<hyper_openssl::HttpsConnector<H>>
where
H: tower::Service<http::Uri> + Send,
H::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
H::Future: Send + 'static,
H::Response:
tokio::io::AsyncRead + tokio::io::AsyncWrite + hyper::client::connect::Connection + Unpin,
{
let mut https =
hyper_openssl::HttpsConnector::with_connector(connector, self.openssl_ssl_connector_builder()?)
.map_err(|e| Error::OpensslTls(tls::openssl_tls::Error::CreateHttpsConnector(e)))?;
Expand Down
3 changes: 2 additions & 1 deletion kube/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ admission = ["kube-core/admission"]
derive = ["kube-derive", "kube-core/schema"]
runtime = ["kube-runtime"]
unstable-runtime = ["kube-runtime/unstable-runtime"]
socks5 = ["kube-client/socks5"]

[package.metadata.docs.rs]
features = ["client", "rustls-tls", "openssl-tls", "derive", "ws", "oauth", "jsonpatch", "admission", "runtime", "k8s-openapi/latest", "unstable-runtime"]
features = ["client", "rustls-tls", "openssl-tls", "derive", "ws", "oauth", "jsonpatch", "admission", "runtime", "k8s-openapi/latest", "unstable-runtime", "socks5"]
# Define the configuration attribute `docsrs`. Used to enable `doc_cfg` feature.
rustdoc-args = ["--cfg", "docsrs"]

Expand Down

0 comments on commit b76575c

Please sign in to comment.