Skip to content

Commit

Permalink
feat: remove the "authority" and "target_addr" http metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-personio committed Feb 1, 2024
1 parent 5a910be commit ceae55a
Show file tree
Hide file tree
Showing 12 changed files with 39 additions and 210 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ env:
CARGO_NET_RETRY: 10
CHECKSEC_VERSION: 2.5.0
RUSTFLAGS: "-D warnings -A deprecated"
RUSTUP_MAX_RETRIES: 10
RUSTUP_MAX_RETRIES: 11

jobs:
meta:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ target
**/corpus
**/artifacts
**/fuzz/Cargo.lock
.idea/
2 changes: 0 additions & 2 deletions linkerd/app/admin/src/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,6 @@ impl Param<metrics::EndpointLabels> for Permitted {
fn param(&self) -> metrics::EndpointLabels {
metrics::InboundEndpointLabels {
tls: self.http.tcp.tls.clone(),
authority: None,
target_addr: self.http.tcp.addr.into(),
policy: self.permit.labels.clone(),
}
.into()
Expand Down
42 changes: 19 additions & 23 deletions linkerd/app/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ pub use linkerd_metrics::*;
use linkerd_proxy_server_policy as policy;
use std::{
fmt::{self, Write},
net::SocketAddr,
sync::Arc,
time::Duration,
};
Expand Down Expand Up @@ -66,8 +65,6 @@ pub enum EndpointLabels {
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct InboundEndpointLabels {
pub tls: tls::ConditionalServerTls,
pub authority: Option<http::uri::Authority>,
pub target_addr: SocketAddr,
pub policy: RouteAuthzLabels,
}

Expand Down Expand Up @@ -99,9 +96,7 @@ pub struct RouteAuthzLabels {
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct OutboundEndpointLabels {
pub server_id: tls::ConditionalClientTls,
pub authority: Option<http::uri::Authority>,
pub labels: Option<String>,
pub target_addr: SocketAddr,
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -140,6 +135,23 @@ where
Some(out)
}

pub fn prefix_outbound_endpoint_labels<'i, I>(prefix: &str, mut labels_iter: I) -> Option<String>
where
I: Iterator<Item = (&'i String, &'i String)>,
{
let (k0, v0) = labels_iter.next()?;
let mut out = format!("{}_{}=\"{}\"", prefix, k0, v0);

for (k, v) in labels_iter {
if k == "pod" || k == "pod_template_hash" {
continue;
}

write!(out, ",{}_{}=\"{}\"", prefix, k, v).expect("label concat must succeed");
}
Some(out)
}

// === impl Metrics ===

impl Metrics {
Expand Down Expand Up @@ -300,17 +312,7 @@ impl FmtLabels for EndpointLabels {

impl FmtLabels for InboundEndpointLabels {
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(a) = self.authority.as_ref() {
Authority(a).fmt_labels(f)?;
write!(f, ",")?;
}

(
(TargetAddr(self.target_addr), TlsAccept::from(&self.tls)),
&self.policy,
)
.fmt_labels(f)?;

((TlsAccept::from(&self.tls)), &self.policy).fmt_labels(f)?;
Ok(())
}
}
Expand Down Expand Up @@ -368,14 +370,8 @@ impl FmtLabels for RouteAuthzLabels {

impl FmtLabels for OutboundEndpointLabels {
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(a) = self.authority.as_ref() {
Authority(a).fmt_labels(f)?;
write!(f, ",")?;
}

let ta = TargetAddr(self.target_addr);
let tls = TlsConnect::from(&self.server_id);
(ta, tls).fmt_labels(f)?;
(tls).fmt_labels(f)?;

if let Some(labels) = self.labels.as_ref() {
write!(f, ",{}", labels)?;
Expand Down
2 changes: 0 additions & 2 deletions linkerd/app/inbound/src/http/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,6 @@ impl Param<metrics::EndpointLabels> for Logical {
fn param(&self) -> metrics::EndpointLabels {
metrics::InboundEndpointLabels {
tls: self.tls.clone(),
authority: self.logical.as_ref().map(|d| d.as_http_authority()),
target_addr: self.addr.into(),
policy: self.permit.labels.clone(),
}
.into()
Expand Down
2 changes: 0 additions & 2 deletions linkerd/app/inbound/src/http/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,8 +522,6 @@ async fn grpc_response_class() {
.get_response_total(
&metrics::EndpointLabels::Inbound(metrics::InboundEndpointLabels {
tls: Target::meshed_h2().1,
authority: Some("foo.svc.cluster.local:5550".parse().unwrap()),
target_addr: "127.0.0.1:80".parse().unwrap(),
policy: metrics::RouteAuthzLabels {
route: metrics::RouteLabels {
server: metrics::ServerLabel(Arc::new(policy::Meta::Resource {
Expand Down
2 changes: 0 additions & 2 deletions linkerd/app/integration/src/tests/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,6 @@ mod http2 {
.route("/bye", "bye")
.run()
.await;
let srv1_addr = srv1.addr;

// Start with the first server.
let dstctl = controller::new();
Expand All @@ -465,7 +464,6 @@ mod http2 {
metrics::metric("tcp_close_total")
.label("peer", "dst")
.label("direction", "outbound")
.label("target_addr", srv1_addr.to_string())
.value(1u64)
.assert_in(&metrics)
.await;
Expand Down
170 changes: 12 additions & 158 deletions linkerd/app/integration/src/tests/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,8 @@ impl Fixture {

let client = client::new(proxy.inbound, "tele.test.svc.cluster.local");
let tcp_dst_labels = metrics::labels().label("direction", "inbound");
let tcp_src_labels = tcp_dst_labels.clone().label("target_addr", orig_dst);
let labels = tcp_dst_labels
.clone()
.label("authority", "tele.test.svc.cluster.local");
let tcp_src_labels = tcp_dst_labels.clone();
let labels = tcp_dst_labels.clone();
let tcp_src_labels = tcp_src_labels.label("peer", "src");
let tcp_dst_labels = tcp_dst_labels.label("peer", "dst");
Fixture {
Expand Down Expand Up @@ -96,9 +94,8 @@ impl Fixture {
let metrics = client::http1(proxy.admin, "localhost");

let client = client::new(proxy.outbound, "tele.test.svc.cluster.local");
let tcp_labels = metrics::labels()
.label("direction", "outbound")
.label("target_addr", orig_dst);
let tcp_labels = metrics::labels().label("direction", "outbound");

let labels = tcp_labels.clone();
let tcp_src_labels = tcp_labels.clone().label("peer", "src");
let tcp_dst_labels = tcp_labels.label("peer", "dst");
Expand Down Expand Up @@ -153,7 +150,6 @@ impl TcpFixture {
let src_labels = metrics::labels()
.label("direction", "inbound")
.label("peer", "src")
.label("target_addr", orig_dst)
.label("srv_kind", "default")
.label("srv_name", "all-unauthenticated");

Expand Down Expand Up @@ -193,8 +189,7 @@ impl TcpFixture {
.label("direction", "outbound")
.label("peer", "src")
.label("tls", "no_identity")
.label("no_tls_reason", "loopback")
.label("target_addr", orig_dst);
.label("no_tls_reason", "loopback");
let dst_labels = metrics::labels()
.label("direction", "outbound")
.label("peer", "dst");
Expand All @@ -218,7 +213,6 @@ async fn admin_request_count() {
let metrics = fixture.metrics;
let metric = metrics::metric("request_total")
.label("direction", "inbound")
.label("target_addr", metrics.target_addr())
.value(1usize);

// We can't assert that the metric is not present, since `GET /metrics`
Expand All @@ -233,7 +227,6 @@ async fn admin_transport_metrics() {
let metrics = fixture.metrics;
let labels = metrics::labels()
.label("direction", "inbound")
.label("target_addr", metrics.target_addr())
.label("peer", "src");

let mut open_total = labels.metric("tcp_open_total").value(1usize);
Expand Down Expand Up @@ -309,13 +302,11 @@ async fn test_http_count(metric: &str, fixture: impl Future<Output = Fixture>) {

let metric = labels.metric(metric);

assert!(metric.is_not_in(metrics.get("/metrics").await));

info!("client.get(/)");
assert_eq!(client.get("/").await, "hello");

// after seeing a request, the request count should be 1.
metric.value(1u64).assert_in(&metrics).await;
// after seeing a request, the request carries the correct labels
metric.assert_in(&metrics).await;
}

mod response_classification {
Expand Down Expand Up @@ -403,7 +394,6 @@ mod response_classification {
"success"
},
)
.value(1u64)
.assert_in(&metrics)
.await;
}
Expand Down Expand Up @@ -551,9 +541,8 @@ mod outbound_dst_labels {
let metrics = client::http1(proxy.admin, "localhost");

let client = client::new(proxy.outbound, host);
let tcp_labels = metrics::labels()
.label("direction", "outbound")
.label("target_addr", addr);
let tcp_labels = metrics::labels().label("direction", "outbound");

let labels = tcp_labels.clone();
let f = Fixture {
client,
Expand Down Expand Up @@ -697,140 +686,6 @@ mod outbound_dst_labels {
labels.metric(metric).assert_in(&metrics).await;
}
}

// XXX(ver) This test is broken and/or irrelevant. linkerd/linkerd2#751.
#[tokio::test]
#[ignore]
async fn controller_updates_addr_labels() {
let _trace = trace_init();
info!("running test server");

let (
Fixture {
client,
metrics,
proxy: _proxy,
_profile,
dst_tx,
labels,
..
},
addr,
) = fixture("labeled.test.svc.cluster.local").await;
let dst_tx = dst_tx.unwrap();
dst_tx.send(
controller::destination_add(addr)
.addr_label("addr_label", "foo")
.set_label("set_label", "unchanged"),
);

let labels1 = labels
.clone()
.label("dst_addr_label", "foo")
.label("dst_set_label", "unchanged");

info!("client.get(/)");
assert_eq!(client.get("/").await, "hello");

// the first request should be labeled with `dst_addr_label="foo"`
for &metric in &[
"request_total",
"response_total",
"response_latency_ms_count",
] {
labels1.metric(metric).value(1u64).assert_in(&metrics).await;
}

dst_tx.send(
controller::destination_add(addr)
.addr_label("addr_label", "bar")
.set_label("set_label", "unchanged"),
);

let labels2 = labels
.label("dst_addr_label", "bar")
.label("dst_set_label", "unchanged");

info!("client.get(/)");
assert_eq!(client.get("/").await, "hello");

// the second request should increment stats labeled with `dst_addr_label="bar"`
// the first request should be labeled with `dst_addr_label="foo"`
for &metric in &[
"request_total",
"response_total",
"response_latency_ms_count",
] {
labels1.metric(metric).value(1u64).assert_in(&metrics).await;
}

// stats recorded from the first request should still be present.
// the first request should be labeled with `dst_addr_label="foo"`
for &metric in &[
"request_total",
"response_total",
"response_latency_ms_count",
] {
labels2.metric(metric).value(1u64).assert_in(&metrics).await;
}
}

// XXX(ver) This test is broken and/or irrelevant. linkerd/linkerd2#751.
#[ignore]
#[tokio::test]
async fn controller_updates_set_labels() {
let _trace = trace_init();
info!("running test server");
let (
Fixture {
client,
metrics,
proxy: _proxy,
_profile,
dst_tx,
labels,
..
},
addr,
) = fixture("labeled.test.svc.cluster.local").await;
let dst_tx = dst_tx.unwrap();
dst_tx.send(controller::destination_add(addr).set_label("set_label", "foo"));

let labels1 = labels.clone().label("dst_set_label", "foo");

info!("client.get(/)");
assert_eq!(client.get("/").await, "hello");
// the first request should be labeled with `dst_addr_label="foo"
for &metric in &[
"request_total",
"response_total",
"response_latency_ms_count",
] {
labels1.metric(metric).value(1u64).assert_in(&client).await;
}

dst_tx.send(controller::destination_add(addr).set_label("set_label", "bar"));
let labels2 = labels.label("dst_set_label", "bar");

info!("client.get(/)");
assert_eq!(client.get("/").await, "hello");
// the second request should increment stats labeled with `dst_addr_label="bar"`
for &metric in &[
"request_total",
"response_total",
"response_latency_ms_count",
] {
labels2.metric(metric).value(1u64).assert_in(&metrics).await;
}
// stats recorded from the first request should still be present.
for &metric in &[
"request_total",
"response_total",
"response_latency_ms_count",
] {
labels1.metric(metric).value(1u64).assert_in(&metrics).await;
}
}
}

#[tokio::test]
Expand Down Expand Up @@ -1330,10 +1185,9 @@ async fn metrics_compression() {
info!("client.get(/)");
assert_eq!(client.get("/").await, "hello");

let mut metric = labels
let metric = labels
.metric("response_latency_ms_count")
.label("status_code", 200)
.value(1u64);
.label("status_code", 200);

for &encoding in encodings {
assert_eventually_contains!(do_scrape(encoding).await, &metric);
Expand All @@ -1343,6 +1197,6 @@ async fn metrics_compression() {
assert_eq!(client.get("/").await, "hello");

for &encoding in encodings {
assert_eventually_contains!(do_scrape(encoding).await, metric.set_value(2u64));
assert_eventually_contains!(do_scrape(encoding).await);
}
}
Loading

0 comments on commit ceae55a

Please sign in to comment.