Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ env:
CARGO_INCREMENTAL: 0
CARGO_NET_RETRY: 10
RUSTFLAGS: "-D warnings -A deprecated --cfg tokio_unstable"
RUSTUP_MAX_RETRIES: 10
RUSTUP_MAX_RETRIES: 12

concurrency:
group: ${{ github.workflow }}-${{ inputs.ref || github.head_ref }}
Expand Down Expand Up @@ -135,7 +135,7 @@ jobs:
- os: windows
arch: arm64
- os: windows
arch: arm
arch: arm

# If we're not actually building on a release tag, don't short-circuit on
# errors. This helps us know whether a failure is platform-specific.
Expand Down
2 changes: 0 additions & 2 deletions linkerd/app/admin/src/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,6 @@ impl Param<metrics::EndpointLabels> for Permitted {
fn param(&self) -> metrics::EndpointLabels {
metrics::InboundEndpointLabels {
tls: self.http.tcp.tls.clone(),
authority: None,
target_addr: self.http.tcp.addr.into(),
policy: self.permit.labels.clone(),
}
.into()
Expand Down
25 changes: 2 additions & 23 deletions linkerd/app/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use linkerd_proxy_server_policy as policy;
use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue};
use std::{
fmt::{self, Write},
net::SocketAddr,
sync::Arc,
time::Duration,
};
Expand Down Expand Up @@ -66,8 +65,6 @@ pub enum EndpointLabels {
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct InboundEndpointLabels {
pub tls: tls::ConditionalServerTls,
pub authority: Option<http::uri::Authority>,
pub target_addr: SocketAddr,
pub policy: RouteAuthzLabels,
}

Expand Down Expand Up @@ -99,10 +96,8 @@ pub struct RouteAuthzLabels {
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct OutboundEndpointLabels {
pub server_id: tls::ConditionalClientTls,
pub authority: Option<http::uri::Authority>,
pub labels: Option<String>,
pub zone_locality: OutboundZoneLocality,
pub target_addr: SocketAddr,
}

#[derive(Debug, Copy, Clone, Default, Hash, Eq, PartialEq, EncodeLabelValue)]
Expand Down Expand Up @@ -317,17 +312,7 @@ impl FmtLabels for EndpointLabels {

impl FmtLabels for InboundEndpointLabels {
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(a) = self.authority.as_ref() {
Authority(a).fmt_labels(f)?;
write!(f, ",")?;
}

(
(TargetAddr(self.target_addr), TlsAccept::from(&self.tls)),
&self.policy,
)
.fmt_labels(f)?;

((TlsAccept::from(&self.tls)), &self.policy).fmt_labels(f)?;
Ok(())
}
}
Expand Down Expand Up @@ -409,14 +394,8 @@ impl svc::Param<OutboundZoneLocality> for OutboundEndpointLabels {

impl FmtLabels for OutboundEndpointLabels {
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(a) = self.authority.as_ref() {
Authority(a).fmt_labels(f)?;
write!(f, ",")?;
}

let ta = TargetAddr(self.target_addr);
let tls = TlsConnect::from(&self.server_id);
(ta, tls).fmt_labels(f)?;
(tls).fmt_labels(f)?;

if let Some(labels) = self.labels.as_ref() {
write!(f, ",{}", labels)?;
Expand Down
4 changes: 0 additions & 4 deletions linkerd/app/inbound/src/http/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,10 +396,6 @@ fn endpoint_labels(
move |t: &Logical| -> metrics::EndpointLabels {
metrics::InboundEndpointLabels {
tls: t.tls.clone(),
authority: unsafe_authority_labels
.then(|| t.logical.as_ref().map(|d| d.as_http_authority()))
.flatten(),
target_addr: t.addr.into(),
policy: t.permit.labels.clone(),
}
.into()
Expand Down
2 changes: 0 additions & 2 deletions linkerd/app/integration/src/tests/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,14 +454,12 @@ mod http2 {
// Simulate the first server falling over without discovery
// knowing about it...
tracing::info!(%alpha.addr, "Stopping");
let alpha_addr = alpha.addr;
alpha.join().await;

// Wait until the proxy has seen the `alpha` disconnect...
metrics::metric("tcp_close_total")
.label("peer", "dst")
.label("direction", "outbound")
.label("target_addr", alpha_addr.to_string())
.value(1u64)
.assert_in(&metrics)
.await;
Expand Down
167 changes: 9 additions & 158 deletions linkerd/app/integration/src/tests/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ impl Fixture {

let client = client::new(proxy.inbound, "tele.test.svc.cluster.local");
let tcp_dst_labels = metrics::labels().label("direction", "inbound");
let tcp_src_labels = tcp_dst_labels.clone().label("target_addr", orig_dst);
let labels = tcp_dst_labels.clone().label("target_port", orig_dst.port());
let tcp_src_labels = tcp_dst_labels.clone();
let labels = tcp_dst_labels.clone();
let tcp_src_labels = tcp_src_labels.label("peer", "src");
let tcp_dst_labels = tcp_dst_labels.label("peer", "dst");
Fixture {
Expand Down Expand Up @@ -94,9 +94,7 @@ impl Fixture {
let metrics = client::http1(proxy.admin, "localhost");

let client = client::new(proxy.outbound, "tele.test.svc.cluster.local");
let tcp_labels = metrics::labels()
.label("direction", "outbound")
.label("target_addr", orig_dst);
let tcp_labels = metrics::labels().label("direction", "outbound")
let labels = tcp_labels.clone();
let tcp_src_labels = tcp_labels.clone().label("peer", "src");
let tcp_dst_labels = tcp_labels.label("peer", "dst");
Expand Down Expand Up @@ -151,7 +149,6 @@ impl TcpFixture {
let src_labels = metrics::labels()
.label("direction", "inbound")
.label("peer", "src")
.label("target_addr", orig_dst)
.label("srv_kind", "default")
.label("srv_name", "all-unauthenticated");

Expand Down Expand Up @@ -191,8 +188,7 @@ impl TcpFixture {
.label("direction", "outbound")
.label("peer", "src")
.label("tls", "no_identity")
.label("no_tls_reason", "loopback")
.label("target_addr", orig_dst);
.label("no_tls_reason", "loopback");
let dst_labels = metrics::labels()
.label("direction", "outbound")
.label("peer", "dst");
Expand All @@ -216,7 +212,6 @@ async fn admin_request_count() {
let metrics = fixture.metrics;
let metric = metrics::metric("request_total")
.label("direction", "inbound")
.label("target_addr", metrics.target_addr())
.value(1usize);

// We can't assert that the metric is not present, since `GET /metrics`
Expand All @@ -231,7 +226,6 @@ async fn admin_transport_metrics() {
let metrics = fixture.metrics;
let labels = metrics::labels()
.label("direction", "inbound")
.label("target_addr", metrics.target_addr())
.label("peer", "src");

let mut open_total = labels.metric("tcp_open_total").value(1usize);
Expand Down Expand Up @@ -306,18 +300,13 @@ async fn test_http_count(metric_name: &str, fixture: impl Future<Output = Fixtur
} = fixture.await;

let metric = labels.metric(metric_name);

let scrape = metrics.get("/metrics").await;
assert!(
metric.is_not_in(scrape),
"{metric:?} should not be in /metrics"
);

info!("client.get(/)");
assert_eq!(client.get("/").await, "hello");

// after seeing a request, the request count should be 1.
metric.value(1u64).assert_in(&metrics).await;
// after seeing a request, the request carry the correct labels
metric.assert_in(&metrics).await;
}

mod response_classification {
Expand Down Expand Up @@ -406,7 +395,6 @@ mod response_classification {
"success"
},
)
.value(1u64)
.assert_in(&metrics)
.await;
}
Expand Down Expand Up @@ -554,9 +542,7 @@ mod outbound_dst_labels {
let metrics = client::http1(proxy.admin, "localhost");

let client = client::new(proxy.outbound, host);
let tcp_labels = metrics::labels()
.label("direction", "outbound")
.label("target_addr", addr);
let tcp_labels = metrics::labels().label("direction", "outbound");
let labels = tcp_labels.clone();
let f = Fixture {
client,
Expand Down Expand Up @@ -700,140 +686,6 @@ mod outbound_dst_labels {
labels.metric(metric).assert_in(&metrics).await;
}
}

// XXX(ver) This test is broken and/or irrelevant. linkerd/linkerd2#751.
#[tokio::test]
#[ignore]
async fn controller_updates_addr_labels() {
let _trace = trace_init();
info!("running test server");

let (
Fixture {
client,
metrics,
proxy: _proxy,
_profile,
dst_tx,
labels,
..
},
addr,
) = fixture("labeled.test.svc.cluster.local").await;
let dst_tx = dst_tx.unwrap();
dst_tx.send(
controller::destination_add(addr)
.addr_label("addr_label", "foo")
.set_label("set_label", "unchanged"),
);

let labels1 = labels
.clone()
.label("dst_addr_label", "foo")
.label("dst_set_label", "unchanged");

info!("client.get(/)");
assert_eq!(client.get("/").await, "hello");

// the first request should be labeled with `dst_addr_label="foo"`
for &metric in &[
"request_total",
"response_total",
"response_latency_ms_count",
] {
labels1.metric(metric).value(1u64).assert_in(&metrics).await;
}

dst_tx.send(
controller::destination_add(addr)
.addr_label("addr_label", "bar")
.set_label("set_label", "unchanged"),
);

let labels2 = labels
.label("dst_addr_label", "bar")
.label("dst_set_label", "unchanged");

info!("client.get(/)");
assert_eq!(client.get("/").await, "hello");

// the second request should increment stats labeled with `dst_addr_label="bar"`
// the first request should be labeled with `dst_addr_label="foo"`
for &metric in &[
"request_total",
"response_total",
"response_latency_ms_count",
] {
labels1.metric(metric).value(1u64).assert_in(&metrics).await;
}

// stats recorded from the first request should still be present.
// the first request should be labeled with `dst_addr_label="foo"`
for &metric in &[
"request_total",
"response_total",
"response_latency_ms_count",
] {
labels2.metric(metric).value(1u64).assert_in(&metrics).await;
}
}

// XXX(ver) This test is broken and/or irrelevant. linkerd/linkerd2#751.
#[ignore]
#[tokio::test]
async fn controller_updates_set_labels() {
let _trace = trace_init();
info!("running test server");
let (
Fixture {
client,
metrics,
proxy: _proxy,
_profile,
dst_tx,
labels,
..
},
addr,
) = fixture("labeled.test.svc.cluster.local").await;
let dst_tx = dst_tx.unwrap();
dst_tx.send(controller::destination_add(addr).set_label("set_label", "foo"));

let labels1 = labels.clone().label("dst_set_label", "foo");

info!("client.get(/)");
assert_eq!(client.get("/").await, "hello");
// the first request should be labeled with `dst_addr_label="foo"
for &metric in &[
"request_total",
"response_total",
"response_latency_ms_count",
] {
labels1.metric(metric).value(1u64).assert_in(&client).await;
}

dst_tx.send(controller::destination_add(addr).set_label("set_label", "bar"));
let labels2 = labels.label("dst_set_label", "bar");

info!("client.get(/)");
assert_eq!(client.get("/").await, "hello");
// the second request should increment stats labeled with `dst_addr_label="bar"`
for &metric in &[
"request_total",
"response_total",
"response_latency_ms_count",
] {
labels2.metric(metric).value(1u64).assert_in(&metrics).await;
}
// stats recorded from the first request should still be present.
for &metric in &[
"request_total",
"response_total",
"response_latency_ms_count",
] {
labels1.metric(metric).value(1u64).assert_in(&metrics).await;
}
}
}

#[tokio::test]
Expand Down Expand Up @@ -1337,8 +1189,7 @@ async fn metrics_compression() {

let mut metric = labels
.metric("response_latency_ms_count")
.label("status_code", 200)
.value(1u64);
.label("status_code", 200);

for &encoding in encodings {
assert_eventually_contains!(do_scrape(encoding).await, &metric);
Expand All @@ -1348,6 +1199,6 @@ async fn metrics_compression() {
assert_eq!(client.get("/").await, "hello");

for &encoding in encodings {
assert_eventually_contains!(do_scrape(encoding).await, metric.set_value(2u64));
assert_eventually_contains!(do_scrape(encoding).await, metric);
}
}
Loading