From d65f61267f62b180c1c9d066c1744d62d9fddf4d Mon Sep 17 00:00:00 2001 From: arloor Date: Fri, 27 Sep 2024 09:31:47 +0800 Subject: [PATCH] aa --- io_x/src/lib.rs | 58 +++++++++++++++++++----------- prom_label/src/lib.rs | 2 +- rust_http_proxy/src/config.rs | 5 +-- rust_http_proxy/src/net_monitor.rs | 16 ++++----- rust_http_proxy/src/proxy.rs | 22 ++++++------ rust_http_proxy/src/web_func.rs | 2 +- 6 files changed, 59 insertions(+), 46 deletions(-) diff --git a/io_x/src/lib.rs b/io_x/src/lib.rs index 9f82428..9e80c7f 100644 --- a/io_x/src/lib.rs +++ b/io_x/src/lib.rs @@ -2,10 +2,10 @@ use std::{fmt::Debug, pin::Pin, task::Context, task::Poll}; use pin_project_lite::pin_project; use prometheus_client::metrics::{counter::Counter, family::Family}; -use tokio::io::AsyncRead; -use tokio::io::AsyncWrite; use std::io; use std::time::Duration; +use tokio::io::AsyncRead; +use tokio::io::AsyncWrite; use futures_util::Future; use tokio::time::{sleep, Instant, Sleep}; @@ -118,7 +118,6 @@ where } } - pin_project! { /// enhance inner tcp stream with prometheus counter #[derive(Debug)] @@ -139,7 +138,7 @@ impl TimeoutIO where T: AsyncWrite + AsyncRead, { - pub fn new(inner: T, timeout:Duration) -> Self { + pub fn new(inner: T, timeout: Duration) -> Self { Self { inner, timeout, @@ -149,7 +148,10 @@ where /// set timeout pub fn _set_timeout_pinned(mut self: Pin<&mut Self>, timeout: Duration) { *self.as_mut().project().timeout = timeout; - self.project().idle_future.as_mut().reset(Instant::now() + timeout); + self.project() + .idle_future + .as_mut() + .reset(Instant::now() + timeout); } } @@ -166,12 +168,15 @@ where let idle_feature = pro.idle_future; let timeout: &mut Duration = pro.timeout; let read_poll = pro.inner.poll_read(cx, buf); - if read_poll.is_ready(){ + if read_poll.is_ready() { // 读到内容或者读到EOF等等,重置计时 idle_feature.reset(Instant::now() + *timeout); - }else if idle_feature.poll(cx).is_ready(){ + } else if idle_feature.poll(cx).is_ready() { // 没有读到内容,且已经timeout,则返回错误 - return Poll::Ready(Err(io::Error::new(io::ErrorKind::TimedOut,format!("read idle for {:?}",timeout)))); + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::TimedOut, + format!("read idle for {:?}", timeout), + ))); } read_poll } @@ -190,10 +195,13 @@ where let idle_feature = pro.idle_future; let timeout: &mut Duration = pro.timeout; let write_poll = pro.inner.poll_write(cx, buf); - if write_poll.is_ready(){ + if write_poll.is_ready() { idle_feature.reset(Instant::now() + *timeout); - }else if idle_feature.poll(cx).is_ready(){ - return Poll::Ready(Err(io::Error::new(io::ErrorKind::TimedOut,format!("write idle for {:?}",timeout)))); + } else if idle_feature.poll(cx).is_ready() { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::TimedOut, + format!("write idle for {:?}", timeout), + ))); } write_poll } @@ -203,10 +211,13 @@ where let idle_feature = pro.idle_future; let timeout: &mut Duration = pro.timeout; let write_poll = pro.inner.poll_flush(cx); - if write_poll.is_ready(){ + if write_poll.is_ready() { idle_feature.reset(Instant::now() + *timeout); - }else if idle_feature.poll(cx).is_ready(){ - return Poll::Ready(Err(io::Error::new(io::ErrorKind::TimedOut,format!("write idle for {:?}",timeout)))); + } else if idle_feature.poll(cx).is_ready() { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::TimedOut, + format!("write idle for {:?}", timeout), + ))); } write_poll } @@ -219,10 +230,13 @@ where let idle_feature = pro.idle_future; let timeout: &mut Duration = pro.timeout; let write_poll = pro.inner.poll_shutdown(cx); - if write_poll.is_ready(){ + if write_poll.is_ready() { idle_feature.reset(Instant::now() + *timeout); - }else if idle_feature.poll(cx).is_ready(){ - return Poll::Ready(Err(io::Error::new(io::ErrorKind::TimedOut,format!("write idle for {:?}",timeout)))); + } else if idle_feature.poll(cx).is_ready() { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::TimedOut, + format!("write idle for {:?}", timeout), + ))); } write_poll } @@ -240,12 +254,14 @@ where let idle_feature = pro.idle_future; let timeout: &mut Duration = pro.timeout; let write_poll = pro.inner.poll_write_vectored(cx, bufs); - if write_poll.is_ready(){ + if write_poll.is_ready() { idle_feature.reset(Instant::now() + *timeout); - }else if idle_feature.poll(cx).is_ready(){ - return Poll::Ready(Err(io::Error::new(io::ErrorKind::TimedOut,format!("write idle for {:?}",timeout)))); + } else if idle_feature.poll(cx).is_ready() { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::TimedOut, + format!("write idle for {:?}", timeout), + ))); } write_poll } } - diff --git a/prom_label/src/lib.rs b/prom_label/src/lib.rs index 6a7d2c3..b4e7bfa 100644 --- a/prom_label/src/lib.rs +++ b/prom_label/src/lib.rs @@ -12,7 +12,7 @@ pub struct LabelImpl(R) where R: Clone + Debug + Hash + PartialEq + Eq + EncodeLabelSet + 'static; -impl LabelImpl { +impl LabelImpl { pub fn new(s: R) -> Self { Self(s) } diff --git a/rust_http_proxy/src/config.rs b/rust_http_proxy/src/config.rs index 54fa9d6..0428e52 100644 --- a/rust_http_proxy/src/config.rs +++ b/rust_http_proxy/src/config.rs @@ -238,10 +238,7 @@ fn log_config(config: &Config) { for ele in reverse_proxy_config.1 { info!( " {:<70} -> {}{}**", - format!( - "*://{}:*{}**", - reverse_proxy_config.0, ele.location - ), + format!("*://{}:*{}**", reverse_proxy_config.0, ele.location), ele.upstream.scheme_and_authority, ele.upstream.replacement ); diff --git a/rust_http_proxy/src/net_monitor.rs b/rust_http_proxy/src/net_monitor.rs index b5a48c3..d8fc3a0 100644 --- a/rust_http_proxy/src/net_monitor.rs +++ b/rust_http_proxy/src/net_monitor.rs @@ -25,7 +25,7 @@ pub(crate) const IGNORED_INTERFACES: [&str; 7] = pub struct NetMonitor { buffer: Arc>>, - #[cfg(feature = "bpf")] + #[cfg(all(target_os = "linux", feature = "bpf"))] cgroup_transmit_counter: cgroup_traffic::CgroupTransmitCounter, } const TOTAL_SECONDS: u64 = 900; @@ -35,17 +35,17 @@ impl NetMonitor { pub fn new() -> Result { Ok(NetMonitor { buffer: Arc::new(RwLock::new(VecDeque::::new())), - #[cfg(feature = "bpf")] + #[cfg(all(target_os = "linux", feature = "bpf"))] cgroup_transmit_counter: cgroup_traffic::init_self_cgroup_skb_monitor()?, }) } - #[cfg(feature = "bpf")] + #[cfg(all(target_os = "linux", feature = "bpf"))] pub(crate) fn get_cgroup_egress(&self) -> u64 { self.cgroup_transmit_counter.get_egress() } - #[cfg(feature = "bpf")] + #[cfg(all(target_os = "linux", feature = "bpf"))] pub(crate) fn get_cgroup_ingress(&self) -> u64 { self.cgroup_transmit_counter.get_ingress() } @@ -88,9 +88,9 @@ impl NetMonitor { } } -#[cfg(feature = "bpf")] +#[cfg(all(target_os = "linux", feature = "bpf"))] use socket_filter::TransmitCounter; -#[cfg(feature = "bpf")] +#[cfg(all(target_os = "linux", feature = "bpf"))] static SOCKET_FILTER: std::sync::LazyLock> = std::sync::LazyLock::new(|| { Arc::new(TransmitCounter::new( &IGNORED_INTERFACES, @@ -98,11 +98,11 @@ static SOCKET_FILTER: std::sync::LazyLock> = std::sync::Laz )) }); -#[cfg(feature = "bpf")] +#[cfg(all(target_os = "linux", feature = "bpf"))] pub fn get_egress() -> u64 { SOCKET_FILTER.get_egress() } -#[cfg(feature = "bpf")] +#[cfg(all(target_os = "linux", feature = "bpf"))] pub fn get_ingress() -> u64 { SOCKET_FILTER.get_ingress() } diff --git a/rust_http_proxy/src/proxy.rs b/rust_http_proxy/src/proxy.rs index 6e197e0..39185dc 100644 --- a/rust_http_proxy/src/proxy.rs +++ b/rust_http_proxy/src/proxy.rs @@ -16,6 +16,8 @@ use crate::{ }; use {io_x::CounterIO, io_x::TimeoutIO, prom_label::LabelImpl}; +#[cfg(target_os = "linux")] +use crate::net_monitor::NetMonitor; use http::{ header::{HOST, LOCATION}, Uri, @@ -45,8 +47,6 @@ use prometheus_client::{ }; use rand::Rng; use tokio::{net::TcpStream, pin}; -#[cfg(target_os = "linux")] -use crate::net_monitor::NetMonitor; pub struct ProxyHandler { pub(crate) config: Config, @@ -62,9 +62,9 @@ pub struct ProxyHandler { pub(crate) struct Metrics { pub(crate) http_req_counter: Family, Counter>, pub(crate) proxy_traffic: Family, Counter>, - #[cfg(feature = "bpf")] + #[cfg(all(target_os = "linux", feature = "bpf"))] pub(crate) net_bytes: Family, Counter>, - #[cfg(feature = "bpf")] + #[cfg(all(target_os = "linux", feature = "bpf"))] pub(crate) cgroup_bytes: Family, Counter>, } const DEFAULT_HOST: &str = "default_host"; @@ -410,7 +410,7 @@ impl ProxyHandler { } } - #[cfg(feature = "bpf")] + #[cfg(all(target_os = "linux", feature = "bpf"))] pub(crate) fn snapshot_metrics(&self) { { self.metrics @@ -719,17 +719,17 @@ fn register_metrics(registry: &mut Registry) -> Metrics { ); let proxy_traffic = Family::, Counter>::default(); registry.register("proxy_traffic", "num proxy_traffic", proxy_traffic.clone()); - #[cfg(feature = "bpf")] + #[cfg(all(target_os = "linux", feature = "bpf"))] let net_bytes = Family::, Counter>::default(); - #[cfg(feature = "bpf")] + #[cfg(all(target_os = "linux", feature = "bpf"))] registry.register( "net_bytes", "num hosts net traffic in bytes", net_bytes.clone(), ); - #[cfg(feature = "bpf")] + #[cfg(all(target_os = "linux", feature = "bpf"))] let cgroup_bytes = Family::, Counter>::default(); - #[cfg(feature = "bpf")] + #[cfg(all(target_os = "linux", feature = "bpf"))] registry.register( "cgroup_bytes", "num this cgroup's net traffic in bytes", @@ -742,9 +742,9 @@ fn register_metrics(registry: &mut Registry) -> Metrics { Metrics { http_req_counter, proxy_traffic, - #[cfg(feature = "bpf")] + #[cfg(all(target_os = "linux", feature = "bpf"))] net_bytes, - #[cfg(feature = "bpf")] + #[cfg(all(target_os = "linux", feature = "bpf"))] cgroup_bytes, } } diff --git a/rust_http_proxy/src/web_func.rs b/rust_http_proxy/src/web_func.rs index d60a459..0b20c45 100644 --- a/rust_http_proxy/src/web_func.rs +++ b/rust_http_proxy/src/web_func.rs @@ -95,7 +95,7 @@ pub async fn serve_http_request( ) { return Ok(build_authenticate_resp(false)); } - #[cfg(feature = "bpf")] + #[cfg(all(target_os = "linux", feature = "bpf"))] proxy_handler.snapshot_metrics(); serve_metrics(&proxy_handler.prom_registry, can_gzip).await }